bitkeeper revision 1.1159.1.35 (4118ef6bM1GarM_w6aDNImKG_0GSRw)
authormjw@wray-m-3.hpl.hp.com <mjw@wray-m-3.hpl.hp.com>
Tue, 10 Aug 2004 15:53:15 +0000 (15:53 +0000)
committermjw@wray-m-3.hpl.hp.com <mjw@wray-m-3.hpl.hp.com>
Tue, 10 Aug 2004 15:53:15 +0000 (15:53 +0000)
Improve error handling for save, and compress the file.

tools/python/xen/xend/XendMigrate.py
tools/xfrd/xfrd.c

index 6eaf0d1fbae3c3f7682a5fd063afac4c6969d897..1ee32f90c07acfb027d2bd9e9d042702cfec9df5 100644 (file)
@@ -14,7 +14,8 @@ from twisted.internet.protocol import ClientFactory
 import sxp
 import XendDB
 import EventServer; eserver = EventServer.instance()
-
+from XendError import XendError
+        
 """The port for the migrate/save daemon xfrd."""
 XFRD_PORT = 8002
 
@@ -77,12 +78,17 @@ class XfrdClientFactory(ClientFactory):
 
     def clientConnectionFailed(self, connector, reason):
         print 'clientConnectionFailed>', 'connector=', connector, 'reason=', reason
+        self.xinfo.error(reason)
 
 class XfrdInfo:
     """Abstract class for info about a session with xfrd.
     Has subclasses for save and migrate.
     """
 
+    """Suspend timeout (seconds).
+    We set a timeout because suspending a domain can hang."""
+    timeout = 30
+
     def __init__(self):
         from xen.xend import XendDomain
         self.xd = XendDomain.instance()
@@ -91,19 +97,18 @@ class XfrdInfo:
         self.paused = {}
         
     def vmconfig(self):
-        print 'vmconfig>'
         dominfo = self.xd.domain_get(self.src_dom)
-        print 'vmconfig>', type(dominfo), dominfo
         if dominfo:
             val = sxp.to_string(dominfo.sxpr())
         else:
             val = None
-        print 'vmconfig<', 'val=', type(val), val
         return val
 
     def error(self, err):
+        print 'Error>', err
         self.state = 'error'
         if not self.deferred.called:
+            print 'Error> calling errback'
             self.deferred.errback(err)
 
     def dispatch(self, xfrd, val):
@@ -115,6 +120,7 @@ class XfrdInfo:
         def cberr(err):
             v = ['xfr.err', errno.EINVAL]
             sxp.show(v, out=xfrd.transport)
+            self.error(err)
 
         op = sxp.name(val)
         op = op.replace('.', '_')
@@ -144,10 +150,6 @@ class XfrdInfo:
         if not err: return
         self.error(err);
         xfrd.loseConnection()
-        #try:
-        #    self.xd.domain_unpause(self.src_dom)
-        #except:
-        #    print >>sys.stdout, "Error unpausing domain:", self.src_dom
         return None
 
     def xfr_progress(self, xfrd, val):
@@ -178,6 +180,8 @@ class XfrdInfo:
     def xfr_vm_suspend(self, xfrd, val):
         """Suspend a domain. Suspending takes time, so we return
         a Deferred that is called when the suspend completes.
+        Suspending can hang, so we set a timeout and fail if it
+        takes too long.
         """
         print 'xfr_vm_suspend>', val
         try:
@@ -185,17 +189,17 @@ class XfrdInfo:
             d = defer.Deferred()
             # Subscribe to 'suspended' events so we can tell when the
             # suspend completes. Subscribe to 'died' events so we can tell if
-            # the domain died.
+            # the domain died. Set a timeout and error handler so the subscriptions
+            # will be cleaned up if suspending hangs or there is an error.
             def onSuspended(e, v):
-                print 'onSuspended>', e, v
+                print 'xfr_vm_suspend>onSuspended>', e, v
                 if v[1] != vmid: return
                 subscribe(on=0)
                 d.callback(v)
                 
             def onDied(e, v):
-                print 'onDied>', e, v
+                print 'xfr_vm_suspend>onDied>', e, v
                 if v[1] != vmid: return
-                subscribe(on=0)
                 d.errback(XendError('Domain died'))
                 
             def subscribe(on=1):
@@ -206,9 +210,16 @@ class XfrdInfo:
                 action('xend.domain.suspended', onSuspended)
                 action('xend.domain.died', onDied)
 
+            def cberr(err):
+                print 'xfr_vm_suspend>cberr>', err
+                subscribe(on=0)
+                return err
+
             subscribe()
             val = self.xd.domain_shutdown(vmid, reason='suspend')
             self.suspended[vmid] = 1
+            d.addErrback(cberr)
+            d.setTimeout(self.timeout)
             return d
         except:
             val = errno.EINVAL
@@ -278,6 +289,7 @@ class XendMigrateInfo(XfrdInfo):
             eserver.inject('xend.migrate.ok', self.sxpr())
         else:
             self.state = 'error'
+            self.error(XendError("save failed"))
             eserver.inject('xend.migrate.error', self.sxpr())
 
 class XendSaveInfo(XfrdInfo):
@@ -320,6 +332,7 @@ class XendSaveInfo(XfrdInfo):
             eserver.inject('xend.save.ok', self.sxpr())
         else:
             self.state = 'error'
+            self.error(XendError("save failed"))
             eserver.inject('xend.save.error', self.sxpr())
     
 
index 301674b1fe6c94ac3059d4c1c7712de4e6ae320c..1b8664b7dabe0dbdc91b00ed329d5538ae259ef6 100644 (file)
@@ -19,6 +19,8 @@
 #include <getopt.h>
 #include <errno.h>
 #include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
 #include <time.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
@@ -33,6 +35,7 @@
 #include "file_stream.h"
 #include "string_stream.h"
 #include "lzi_stream.h"
+#include "gzip_stream.h"
 #include "sys_net.h"
 #include "sys_string.h"
 
@@ -46,7 +49,7 @@
 #include "select.h"
 
 #define MODULE_NAME "XFRD"
-#define DEBUG 1
+#define DEBUG 0
 #include "debug.h"
 
 /*
@@ -324,8 +327,7 @@ void set_defaults(Args *args){
 
 int stringof(Sxpr exp, char **s){
     int err = 0;
-    dprintf(">\n");
-    objprint(iostdout, exp, PRINT_TYPE); IOStream_print(iostdout, "\n");
+    //dprintf(">\n"); objprint(iostdout, exp, PRINT_TYPE); IOStream_print(iostdout, "\n");
     if(ATOMP(exp)){
         *s = atom_name(exp);
     } else if(STRINGP(exp)){
@@ -334,7 +336,7 @@ int stringof(Sxpr exp, char **s){
         err = -EINVAL;
         *s = NULL;
     }
-    dprintf("< err=%d s=%s\n", err, *s);
+    //dprintf("< err=%d s=%s\n", err, *s);
     return err;
 }
 
@@ -342,8 +344,7 @@ int intof(Sxpr exp, int *v){
     int err = 0;
     char *s;
     unsigned long l;
-    dprintf(">\n");
-    objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
+    //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
     if(INTP(exp)){
         *v = OBJ_INT(exp);
     } else {
@@ -353,7 +354,7 @@ int intof(Sxpr exp, int *v){
         *v = (int)l;
     }
  exit:
-    dprintf("< err=%d v=%d\n", err, *v);
+    //dprintf("< err=%d v=%d\n", err, *v);
     return err;
 }
 
@@ -361,8 +362,7 @@ int addrof(Sxpr exp, uint32_t *v){
     char *h;
     unsigned long a;
     int err = 0;
-    dprintf(">\n");
-    objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
+    //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
     err = stringof(exp, &h);
     if(err) goto exit;
     if(get_host_address(h, &a)){
@@ -371,15 +371,14 @@ int addrof(Sxpr exp, uint32_t *v){
     }
     *v = a;
   exit:
-    dprintf("< err=%d v=%x\n", err, *v);
+    //dprintf("< err=%d v=%x\n", err, *v);
     return err;
 }
 
 int portof(Sxpr exp, uint16_t *v){
     char *s;
     int err = 0;
-    dprintf(">\n");
-    objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
+    //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
     if(INTP(exp)){
         *v = get_ul(exp);
         *v = htons(*v);
@@ -395,7 +394,7 @@ int portof(Sxpr exp, uint16_t *v){
         *v = p;
     }
   exit:
-    dprintf("< err=%d v=%u\n", err, *v);
+    //dprintf("< err=%d v=%u\n", err, *v);
     return err;
 }
 
@@ -468,7 +467,7 @@ int xfr_hello(Conn *conn){
     err = Conn_sxpr(conn, &sxpr);
     if(err) goto exit;
     if(!sxpr_elementp(sxpr, oxfr_hello)){
-        dprintf("> sxpr_elementp test failed\n");
+        wprintf("> sxpr_elementp test failed\n");
         err = -EINVAL;
         goto exit;
     }
@@ -507,7 +506,6 @@ int xfr_send_hello(Conn *conn){
                          XFR_PROTO_MINOR);
     if(err < 0) goto exit;
     IOStream_flush(conn->out);
-    dprintf("> xfr_response...\n");
     err = xfr_response(conn);
   exit:
     dprintf("< err=%d\n", err);
@@ -671,7 +669,6 @@ int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t po
     dprintf("> Xfr xfr_addr=%s:%d\n", inet_ntoa(xfr_addr), ntohs(xfr_port));
     err = Conn_connect(peer, flags, xfr_addr, xfr_port);
     if(err) goto exit;
-    printf("\n");
     XfrState_set_state(state, XFR_HELLO);
     // Send hello message.
     err = xfr_send_hello(peer);
@@ -685,10 +682,9 @@ int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t po
         int plain_bytes = lzi_stream_plain_bytes(zio);
         int comp_bytes = lzi_stream_comp_bytes(zio);
         float ratio = lzi_stream_ratio(zio);
-        dprintf("> Compression: plain %d bytes, compressed %d bytes, ratio %3.2f\n",
+        iprintf("> Compression: plain %d bytes, compressed %d bytes, ratio %3.2f\n",
                 plain_bytes, comp_bytes, ratio);
     }
-    printf("\n");
   exit:
     dprintf("> err=%d\n", err);
     if(err && !XfrState_get_err(state)){
@@ -697,7 +693,7 @@ int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t po
     Conn_close(peer);
     if(!err){
         t1 = time(NULL) - t0;
-        dprintf("> Transfer complete in %lu seconds\n", t1);
+        iprintf("> Transfer complete in %lu seconds\n", t1);
     }
     dprintf("> done err=%d, notifying xend...\n", err);
     xfr_send_done(state, xend);
@@ -709,15 +705,24 @@ int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t po
  */
 int xfr_save(Args *args, XfrState *state, Conn *xend, char *file){
     int err = 0;
+    int flags = (O_CREAT | O_EXCL | O_WRONLY);
+    int mode = 0644;
+    int fd;
     IOStream *io = NULL;
 
     dprintf("> file=%s\n", file);
-    io = file_stream_fopen(file, "wb");
-    if(!io){
-        dprintf("> Failed to open %s\n", file);
+    fd = open(file, flags, mode);
+    if(fd < 0) {
+        eprintf("> Failed to open %s\n", file);
         err = -EIO;
         goto exit;
     }
+    io = gzip_stream_fdopen(fd, "wb1");
+    if(!io){
+        eprintf("> Failed to allocate gzip state for %s\n", file);
+        err = -ENOMEM;
+        goto exit;
+    }
     err = xen_domain_snd(xend, io, state->vmid, state->vmconfig, state->vmconfig_n);
     if(err){
         err = xfr_error(xend, err);
@@ -729,6 +734,9 @@ int xfr_save(Args *args, XfrState *state, Conn *xend, char *file){
         IOStream_close(io);
         IOStream_free(io);
     }
+    if(err){
+        unlink(file);
+    }
     dprintf("< err=%d\n", err);
     return err;
 }
@@ -758,7 +766,7 @@ int xfr_recv(Args *args, XfrState *state, Conn *peer){
   exit:
     if(!err){
         t1 = time(NULL) - t0;
-        dprintf("> Transfer complete in %lu seconds\n", t1);
+        iprintf("> Transfer complete in %lu seconds\n", t1);
     }
     if(err){
         xfr_error(peer, err);
@@ -783,14 +791,14 @@ int xfrd_service(Args *args, int peersock, struct sockaddr_in peer_in){
     dprintf(">\n");
     err = Conn_init(conn, flags, peersock, peer_in);
     if(err) goto exit;
-    dprintf(">xfr_hello... \n");
+    //dprintf(">xfr_hello... \n");
     err = xfr_hello(conn);
     if(err) goto exit;
-    dprintf("> sxpr...\n");
+    //dprintf("> sxpr...\n");
     err = Conn_sxpr(conn, &sxpr);
     if(err) goto exit;
-    dprintf("> sxpr=\n");
-    objprint(iostdout, sxpr, PRINT_TYPE); IOStream_print(iostdout, "\n");
+    //dprintf("> sxpr=\n");
+    //objprint(iostdout, sxpr, PRINT_TYPE); IOStream_print(iostdout, "\n");
     if(sxpr_elementp(sxpr, oxfr_migrate)){
         // Migrate message from xend.
         uint32_t addr;
@@ -839,7 +847,7 @@ int xfrd_service(Args *args, int peersock, struct sockaddr_in peer_in){
     } else{
         // Anything else is invalid.
         err = -EINVAL;
-        dprintf("> Invalid message: ");
+        eprintf("> Invalid message: ");
         objprint(iostderr, sxpr, 0);
         IOStream_print(iostderr, "\n");
         xfr_error(conn, err);
@@ -1091,6 +1099,7 @@ int main(int argc, char *argv[]){
     int key = 0;
     int long_index = 0;
 
+    dprintf(">\n");
     set_defaults(args);
     while(1){
        key = getopt_long(argc, argv, short_opts, long_opts, &long_index);